-
Notifications
You must be signed in to change notification settings - Fork 993
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
sync job works concurrently #190
sync job works concurrently #190
Conversation
@hzxuzhonghu , pls do have a look. |
go wait.Until(cc.worker, 0, stopCh) | ||
|
||
for i := 0; i < workers; i++ { | ||
go wait.Until(cc.worker, time.Second, stopCh) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why we use time.Second
here but not 0
like the old logic? ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lminzhw followed k8s controller mgr controllers way.I hope if we have many controllers then its good to have some time instead of zero. may be we can have this parameter also as config parameter. what do you say?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I browse some code of controller in k8s, and realize that most of k8s controller use time.Second
as interval and multi-worker to solve the large scale problem.
Seems good :)
@SrinivasChilveri it is not that simple to concurrently run multi workers, the workers may conflict operating same Job i think. Have to prevent this. |
yeah, as our controller can have multiple entries related to same job in queue. have pushed the commit. pls do have a look |
@hzxuzhonghu ,added new commit for your comment. pls do have a look. |
cc.jobsLock.Lock() | ||
if cc.jobsMap[key] { | ||
// the job is being processed by some other thread | ||
cc.queue.AddRateLimited(req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if conflict after a few times, it'll take a really time to retry job. It's better to shard the request to different chanel/worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also this re-enqueue ops can lead to request processed in unexpected orders, though i have no detailed scenario now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@k82cn , @hzxuzhonghu , Based on my understading sharding with single queue will not solve this issue. so i have added queueList with number of workers & did the commit. please do review the same.
28cab1c
to
c00d12c
Compare
// Later we can remove this code if we want | ||
if !cc.belongsToThisRoutine(key, count) { | ||
glog.Errorf("should not occur The job does not belongs to this routine key:%s, worker:%d...... ", key, count) | ||
cc.queueList[count].AddRateLimited(req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do not use AddRateLimited in this case; just hash request by job into different queue, and have several workers to handle them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
some nits
func (cc *Controller) processNextReq() bool { | ||
obj, shutdown := cc.queue.Get() | ||
func (cc *Controller) processNextReq(count uint32) bool { | ||
obj, shutdown := cc.queueList[count].Get() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would like to acquire the queue before this loop
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
defer cc.queueList[count].Done(req) | ||
|
||
key := jobcache.JobKeyByReq(&req) | ||
// Later we can remove this code if we want |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why add this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Basically we are sharing the jobs and adding into different queues. In this function just reading from different queues ,so it never happens to hit success this condition because the while placing the job request into different queue based on shard. while reading also it should belong to same shard / queue/worker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove that
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
} | ||
|
||
// prevent multi threads processing the same job simultaneously. | ||
cc.jobsLock.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would not happen if you do sharding in the source.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
agreed, will remove the code
@@ -241,7 +247,9 @@ func (cc *Controller) updatePod(oldObj, newObj interface{}) { | |||
JobVersion: int32(dVersion), | |||
} | |||
|
|||
cc.queue.Add(req) | |||
key := vkcache.JobKeyByReq(&req) | |||
i := cc.getWorkerID(key) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can guarantee worker threads will not process same jobs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
I'd like to see some tests on concurrent accessing the job cache |
better to do in separate PR |
LGTM, leaves to @k82cn for last eye |
@k82cn @hzxuzhonghu , pls do have a look and let me know, need to update any more points. |
f33c5e6
to
acf0193
Compare
@k82cn , the PR rebased. |
@k82cn do i need to change anything else. |
} | ||
|
||
// TODO we may need to make this sharding more proper if required | ||
func (cc *Controller) getWorkerID(key string) workqueue.RateLimitingInterface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function doesn't work as its name describes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed the name
/lgtm |
/lgtm |
cmd/controllers/app/server.go
Outdated
queueController := queue.NewQueueController(kubeClient, kbClient) | ||
garbageCollector := garbagecollector.New(vkClient) | ||
|
||
run := func(ctx context.Context) { | ||
go jobController.Run(ctx.Done()) | ||
go jobController.Run(opt.WorkerThreads, ctx.Done()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already create jobController with opt.WorkerThreads
, why Run
still need this parameters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
@@ -21,6 +21,9 @@ import ( | |||
"sync" | |||
|
|||
"github.com/golang/glog" | |||
"hash" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close to other system libs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed
} | ||
|
||
// NewJobController create new Job Controller | ||
func NewJobController( | ||
kubeClient kubernetes.Interface, | ||
kbClient kbver.Interface, | ||
vkClient vkver.Interface, | ||
workers uint32, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
func (cc *Controller) worker() { | ||
for cc.processNextReq() { | ||
func (cc *Controller) worker(i uint32) { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
time.Second, | ||
stopCh) | ||
}(i) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove empty line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed
@@ -65,7 +65,9 @@ func (cc *Controller) addJob(obj interface{}) { | |||
glog.Errorf("Failed to add job <%s/%s>: %v in cache", | |||
job.Namespace, job.Name, err) | |||
} | |||
cc.queue.Add(req) | |||
key := vkcache.JobKeyByReq(&req) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
introduces a helper function for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@hzxuzhonghu @k82cn @TommyLike , Fixed all the comments |
bff14cf
to
eee74aa
Compare
/lgtm |
/approve |
@TommyLike @hex108 any other suggestions. |
ping @k82cn for approval |
/lgtm |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: hzxuzhonghu, k82cn, SrinivasChilveri The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
sync jobs concurrently
Fix : #93